package com.gsm.state;

import com.gsm.bean.WaterSensor;
import com.gsm.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

/**
 * TODO 水位超过指定的阈值发送告警，阈值可以动态修改
 *
 * @author gsm
 * @version 1.0
 */
public class d8_OperatorBroadcastStateDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);


        // 数据流
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 7777)
                .map(new WaterSensorMapFunction());

        // 配置流（用来广播配置）
        DataStreamSource<String> configDS =
                env.socketTextStream("localhost", 8888);

        // TODO 1. 将 配置流 广播
        MapStateDescriptor<String, Integer> broadcastMapState =
                new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT); // <"threshold" , 10|11|...>

        BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState);

        // TODO 2.把 数据流 和 广播后的配置流 connect
        BroadcastConnectedStream<WaterSensor, String> sensorBCS =
                sensorDS.connect(configBS);

        // TODO 3.调用 process
        sensorBCS
                .process(
                        new BroadcastProcessFunction<WaterSensor, String, String>() {

                            /**
                             * 数据流的处理方法： 数据流 只能 读取 广播状态，不能修改
                             * @param value
                             * @param ctx
                             * @param out
                             * @throws Exception
                             */
                            @Override
                            public void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {

                                // TODO 5.通过上下文获取广播状态，取出里面的值（只读，不能修改）

                                ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);

                                Integer threshold = broadcastState.get("threshold");

                                // 判断广播状态里是否有数据，因为刚启动时，可能是数据流的第一条数据先来
                                threshold = (threshold == null ? 0 : threshold);

                                if (value.getVc() > threshold) {
                                    out.collect(value + ",水位超过指定的阈值：" + threshold + "!!!");
                                }

                            }

                            /**
                             * 广播后的配置流的处理方法:  只有广播流才能修改 广播状态
                             * @param value
                             * @param ctx
                             * @param out
                             * @throws Exception
                             */
                            @Override
                            public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {

                                // TODO 4. 通过上下文获取广播状态，往里面写数据
                                BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);

                                broadcastState.put("threshold", Integer.valueOf(value));

                            }
                        }

                )
                .print();

        env.execute();
    }
}

/**

 测试

 nc -lp 7777                 nc -lp   8888
 s1,1,1                                      WaterSensor{id='s1',ts=1,vc=1} 水位超过指定阈值: 0
 s1,2,2                                      WaterSensor{id='s1',ts=2,vc=2} 水位超过指定阈值: 0
 s1,3,3                                      WaterSensor{id='s1',ts=3,vc=3} 水位超过指定阈值: 0
 s1,4,4                                      WaterSensor{id='s1',ts=4,vc=4} 水位超过指定阈值: 0
 s1,5,5                                      WaterSensor{id='s1',ts=5,vc=5} 水位超过指定阈值: 0
                            10
 s1,6,11                                    WaterSensor{id='s1',ts=6,vc=11} 水位超过指定阈值: 10





 */